package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.StructStreamingUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.IOException;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;

/**
 * Created by ${RQL} on 2019/1/16
 */
public class StructuredStreamingFromKafka {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("StructuredStreamingWindow")
                .master("local[4]")
                .getOrCreate();

        //从kafka里面读取
        Dataset<Row> lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", "earliest")
                .option("includeTimestamp", true)
                .load();

        //打印表结构
        lines.printSchema();
        //过滤不规则数据
        //lines.filter((FilterFunction<Row>) x -> StructStreamingUtil.isStandardData(x.toString()));
        //加载配置文件
        ColumnsUtil columnsUtil = ColumnsUtil.getInstance();
        Dataset<Row> rowDataset = lines
                .select(col("value").cast("string"),col("timestamp").cast("Timestamp"))//
                .withColumn("tmp", split(col("value"), "[|]"))
                .select(columnsUtil.combineColumns(columnsUtil.getColumns("tmp"),col("timestamp").cast("Timestamp")));
               // .drop(col("tmp"));
        rowDataset.printSchema();

        String windowDuration = ConfigurationManager.getProperty(Constants.WINDOW_DURATION) + " seconds";
        String slideDuration = ConfigurationManager.getProperty(Constants.SLIDE_DURATION) + " seconds";

        // Group the data by window and word and compute the count of each group
        Dataset<Row> windowedCounts = rowDataset.groupBy(
                functions.window(rowDataset.col("timestamp"),windowDuration,slideDuration ),
                rowDataset.col("phone")
        ).count().orderBy("window");

        StreamingQuery query = windowedCounts.writeStream()
                .outputMode("complete")
                .format("console")
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}
